package sparkcore.day6.lesson03

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

/**
  * Created by Administrator on 2018/5/2.
  */
object SparkTopN {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(s"${Constants.SPARK_APP_NAME}").setMaster("local")
    val sc = new SparkContext(conf)
    val rdd: RDD[String] = sc.textFile("D:\\1711班\\第十二天\\资料\\meituan.txt")

    /**
      * 第一步：
      *  获取到所有的品类id
      */
    val allCategoryid = getAllCategoryID(rdd).distinct()

    /**
      * 第二步：
      *  分别获取品类的：
      *    点击，下单，支付的次数
      */
    val clickCategoryCount = getClickCategoryCount(rdd)
    val orderCategoryCount = getOrderCategoryCount(rdd)
    val payCategoryCount = getPayCategoryCount(rdd)

    /**
      * 第三步：
      * 第一步和第二步的结果进行leftjoin
      */
    val resultRDD: RDD[(Long, String)] = leftJoinData(allCategoryid,clickCategoryCount,orderCategoryCount,payCategoryCount)
    /**
      * 第四步：
      * 实现二次排序的效果
      */
    getTopN(resultRDD)

    sc.stop()
  }

  /**
    * 获取所有的品类id
    * @param rdd
    * @return
    */
  def getAllCategoryID(rdd: RDD[String]):RDD[(Long,Long)]={
    val ids = new mutable.HashSet[(Long,Long)]
    rdd.flatMap( line =>{
      val fields = line.split(",")
      val click_category_id = fields(7)
      val order_category_id = fields(9)
      val pay_category_id = fields(11)
      if(click_category_id != null && !click_category_id.trim.equals("")){
        ids+=((click_category_id.toLong,click_category_id.toLong))
      }
      if(order_category_id != null && !order_category_id.trim.equals("")){
        val fields = order_category_id.split("\\^A")
        for(categoryid <- fields ){
          ids += ((categoryid.toLong,categoryid.toLong))
        }
      }
      if(pay_category_id != null && !pay_category_id.trim.equals("")){
        val fields = pay_category_id.split("\\^A")
        for(categoryid <- fields ){
          ids += ((categoryid.toLong,categoryid.toLong))
        }
      }
     ids
    })
  }

  /**
    * 获取 品类的点击的次数
    * @param rdd
    * @return
    */
  def getClickCategoryCount(rdd: RDD[String]):RDD[(Long,Long)]={
    rdd.filter( line =>{
      val fields = line.split(",")
      fields(7) != null && !fields(7).trim.equals("")
    }).map( line =>{
      val click_category_id = line.split(",")(7).toLong
      (click_category_id,1L)
    }).reduceByKey(_+_)
  }

  /**
    * 获取品类的下单次数
    * @param rdd
    * @return
    */
  def getOrderCategoryCount(rdd: RDD[String]):RDD[(Long,Long)]={
    rdd.filter( line =>{
      val fields = line.split(",")(9)
      fields != null && !fields.trim.equals("")
    }).flatMap( line =>{
      line.split(",")(9).split("\\^A")
    }).map( order_category_id =>{
      (order_category_id.toLong,1L)
    }).reduceByKey(_+_)
  }

  /**
    * 获取品类的支付次数
    * @param rdd
    * @return
    */
  def getPayCategoryCount(rdd: RDD[String]):RDD[(Long,Long)]={
    rdd.filter( line =>{
      val fields = line.split(",")(11)
      fields != null && ! fields.trim.equals("")
    }).flatMap( line =>{
      line.split(",")(11).split(s"\\${Constants.SPLIT_FIELDS}")
    }).map( pay_category_id =>{
      (pay_category_id.toLong,1L)
    }).reduceByKey(_+_)
  }

  def leftJoinData(
                  allCategoryID2ID:RDD[(Long,Long)],
                  clickCategoryidAndCount:RDD[(Long,Long)],
                  orderCategoryidAndCount:RDD[(Long,Long)],
                  payCategoryidAndCount:RDD[(Long,Long)]
                  ): RDD[(Long,String)] ={
    /**
      * Long:categoryid
      * Long:categoryid
      *   Some
      *   None
      */
    val resultRDD: RDD[(Long, (Long, Option[Long]))] =
                            allCategoryID2ID.leftOuterJoin(clickCategoryidAndCount)

    val result2RDD: RDD[(Long, (String, Option[Long]))] = resultRDD.map(tuple => {
      val category_id = tuple._1.toLong
      val click_category_count = tuple._2._2.getOrElse(0)
      val value = s"${Constants.CATEGORY_ID}=" + category_id + "|" + s"${Constants.CLICK_CATEGORY_COUNT}=" + click_category_count
      (category_id, value)
    }).leftOuterJoin(orderCategoryidAndCount)
    result2RDD

    val result3RDD = result2RDD.map(tuple => {
      val category_id = tuple._1.toLong
      var value = tuple._2._1
      val count = tuple._2._2.getOrElse(0)
      //category_id=1|click_category_count=5|order_category_count=4
      value += "|" + s"${Constants.ORDER_CATEGORY_COUNT}=" + count
      (category_id, value)
    }).leftOuterJoin(payCategoryidAndCount)

    result3RDD.map( tuple =>{
      val category_id = tuple._1.toLong
      var value = tuple._2._1
      val count = tuple._2._2.getOrElse(0)
      value += "|" + s"${Constants.PAY_CATEGORY_COUNT}=" + count
      //category_id=1|click_category_count=5|order_category_count=4|pay_category_count=1
      (category_id,value)
    })


  }

  /**
    * 实现二次排序的效果
    * 根据 点击，下单，支付 进行排序
    *
    * @param resultRDD
    */
  def getTopN(resultRDD: RDD[(Long, String)]): Unit ={
      resultRDD.map( tuple =>{
        val category_id = tuple._1
        val value = tuple._2
        //category_id=1|click_category_count=5|order_category_count=4|pay_category_count=1
        val click_count = value.split("\\|")(1).split("=")(1).toLong
        val order_count = value.split("\\|")(2).split("=")(1).toLong
        val pay_count = value.split("\\|")(3).split("=")(1).toLong
        val key = new Sortkey(click_count,order_count,pay_count)
        //这个地方value的位置其实就我们做项目而言，返回来一个category_id就可以了
        //现在返回来value字段，其实就是为了看效果而已！！！
        (key,value)
      }).sortByKey(false)
      .foreach( tuple =>{
        println(tuple._2);

        /***
          *
          * category_id=1|click_category_count=2|order_category_count=5|pay_category_count=5
            category_id=2|click_category_count=2|order_category_count=5|pay_category_count=4
            category_id=4|click_category_count=1|order_category_count=0|pay_category_count=1
            category_id=3|click_category_count=0|order_category_count=4|pay_category_count=4
          *
          *
          */
      })



  }






}
